package com.cimu.rocketmq.batch;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;

/**
 * 消费者
 *
 * @author cgx on 2020/1/16
 */
public class BatchConsumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {

        //初始化指定消费者的group名称
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("batch_consumer_name");

        //指定name server地址
        consumer.setNamesrvAddr("localhost:9876");

        // 订阅一个更多的主题来消费。
        consumer.subscribe("TopicBatch", "*");
        //设置消息监听，并对消息进行处理
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

        //开始消费者
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}
